/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.remoting.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;

import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.remoting.ChannelEventListener;
import com.alibaba.rocketmq.remoting.InvokeCallback;
import com.alibaba.rocketmq.remoting.RPCHook;
import com.alibaba.rocketmq.remoting.RemotingClient;
import com.alibaba.rocketmq.remoting.common.Pair;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.common.RemotingUtil;
import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;

/**
 * Remoting客户端实现
 * 
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-13
 */
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
	private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);

	private static final long LockTimeoutMillis = 3000;

	private final NettyClientConfig nettyClientConfig;
	private final Bootstrap bootstrap = new Bootstrap();
	private final EventLoopGroup eventLoopGroupWorker;
	private DefaultEventExecutorGroup defaultEventExecutorGroup;

	private final Lock lockChannelTables = new ReentrantLock();
	private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();

	// 定时器
	private final Timer timer = new Timer("ClientHouseKeepingService", true);

	// Name server相关
	private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
	private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
	private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
	private final Lock lockNamesrvChannel = new ReentrantLock();

	// 处理Callback应答器
	private final ExecutorService publicExecutor;

	private final ChannelEventListener channelEventListener;

	private RPCHook rpcHook;

	class ChannelWrapper {
		private final ChannelFuture channelFuture;

		public ChannelWrapper(ChannelFuture channelFuture) {
			this.channelFuture = channelFuture;
		}

		public boolean isOK() {
			return (this.channelFuture.channel() != null && this.channelFuture.channel().isActive());
		}

		public boolean isWriteable() {
			return this.channelFuture.channel().isWritable();
		}

		private Channel getChannel() {
			return this.channelFuture.channel();
		}

		public ChannelFuture getChannelFuture() {
			return channelFuture;
		}
	}

	class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

		@Override
		protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
			processMessageReceived(ctx, msg);

		}
	}

	class NettyConnetManageHandler extends ChannelDuplexHandler {
		@Override
		public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
			final String local = localAddress == null ? "UNKNOW" : localAddress.toString();
			final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString();
			log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
			super.connect(ctx, remoteAddress, localAddress, promise);

			if (NettyRemotingClient.this.channelEventListener != null) {
				NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
			}
		}

		@Override
		public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
			final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
			log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
			closeChannel(ctx.channel());
			super.disconnect(ctx, promise);

			if (NettyRemotingClient.this.channelEventListener != null) {
				NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
			}
		}

		@Override
		public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
			final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
			log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
			closeChannel(ctx.channel());
			super.close(ctx, promise);

			if (NettyRemotingClient.this.channelEventListener != null) {
				NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
			}
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
			final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
			log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
			log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
			closeChannel(ctx.channel());
			if (NettyRemotingClient.this.channelEventListener != null) {
				NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
			}
		}

		@Override
		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
			if (evt instanceof IdleStateEvent) {
				IdleStateEvent evnet = (IdleStateEvent) evt;
				if (evnet.state().equals(IdleState.ALL_IDLE)) {
					final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
					log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
					closeChannel(ctx.channel());
					if (NettyRemotingClient.this.channelEventListener != null) {
						NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
					}
				}
			}

			ctx.fireUserEventTriggered(evt);
		}
	}

	private static int initValueIndex() {
		Random r = new Random();

		return Math.abs(r.nextInt() % 999) % 999;
	}

	public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
		this(nettyClientConfig, null);
	}

	public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
			final ChannelEventListener channelEventListener) {
		super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
		this.nettyClientConfig = nettyClientConfig;
		this.channelEventListener = channelEventListener;

		int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
		if (publicThreadNums <= 0) {
			publicThreadNums = 4;
		}

		this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
			private AtomicInteger threadIndex = new AtomicInteger(0);

			@Override
			public Thread newThread(Runnable r) {
				return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
			}
		});

		this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
			private AtomicInteger threadIndex = new AtomicInteger(0);

			@Override
			public Thread newThread(Runnable r) {
				return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
			}
		});
	}

	@Override
	public void start() {
		this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
				nettyClientConfig.getClientWorkerThreads(), //
				new ThreadFactory() {

					private AtomicInteger threadIndex = new AtomicInteger(0);

					@Override
					public Thread newThread(Runnable r) {
						return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
					}
				});

		Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
				//
				.option(ChannelOption.TCP_NODELAY, true)
				//
				.option(ChannelOption.SO_KEEPALIVE, false)
				//
				.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
				//
				.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
				//
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					public void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline().addLast(//
								defaultEventExecutorGroup, //
								new NettyEncoder(), //
								new NettyDecoder(), //
								new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //
								new NettyConnetManageHandler(), //
								new NettyClientHandler());
					}
				});

		this.timer.scheduleAtFixedRate(new TimerTask() {

			@Override
			public void run() {
				try {
					// 定时任务每隔1秒定期扫描responseTable列表，
					// 遍历该列表中的ResponseFuture对象，检查等待响应是否超时，
					// 若超时，则调用ResponseFuture. executeInvokeCallback()方法，并将该对象从responseTable列表中删除；
					NettyRemotingClient.this.scanResponseTable();
				} catch (Exception e) {
					log.error("scanResponseTable exception", e);
				}
			}
		}, 1000 * 3, 1000);

		if (this.channelEventListener != null) {
			this.nettyEventExecuter.start();
		}
	}

	@Override
	public void shutdown() {
		try {
			this.timer.cancel();

			for (ChannelWrapper cw : this.channelTables.values()) {
				this.closeChannel(null, cw.getChannel());
			}

			this.channelTables.clear();

			this.eventLoopGroupWorker.shutdownGracefully();

			if (this.nettyEventExecuter != null) {
				this.nettyEventExecuter.shutdown();
			}

			if (this.defaultEventExecutorGroup != null) {
				this.defaultEventExecutorGroup.shutdownGracefully();
			}
		} catch (Exception e) {
			log.error("NettyRemotingClient shutdown exception, ", e);
		}

		if (this.publicExecutor != null) {
			try {
				this.publicExecutor.shutdown();
			} catch (Exception e) {
				log.error("NettyRemotingServer shutdown exception, ", e);
			}
		}
	}

	private Channel getAndCreateChannel(final String addr) throws InterruptedException {
		if (null == addr)
			return getAndCreateNameserverChannel();
		// 根据broker地址从RemotingClient.channelTables:ConcurrentHashMap<String /* addr */, ChannelWrapper>变量中
		// 获取ChannelWrapper对象并返回该对象的Channel变量；
		ChannelWrapper cw = this.channelTables.get(addr);
		if (cw != null && cw.isOK()) {
			return cw.getChannel();
		}
		// 若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中，便于下次使用；
		return this.createChannel(addr);
	}

	private Channel getAndCreateNameserverChannel() throws InterruptedException {
		String addr = this.namesrvAddrChoosed.get();
		if (addr != null) {
			ChannelWrapper cw = this.channelTables.get(addr);
			if (cw != null && cw.isOK()) {
				return cw.getChannel();
			}
		}

		final List<String> addrList = this.namesrvAddrList.get();
		if (this.lockNamesrvChannel.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
			try {
				addr = this.namesrvAddrChoosed.get();
				if (addr != null) {
					ChannelWrapper cw = this.channelTables.get(addr);
					if (cw != null && cw.isOK()) {
						return cw.getChannel();
					}
				}

				if (addrList != null && !addrList.isEmpty()) {
					for (int i = 0; i < addrList.size(); i++) {
						int index = this.namesrvIndex.incrementAndGet();
						index = Math.abs(index);
						index = index % addrList.size();
						String newAddr = addrList.get(index);

						this.namesrvAddrChoosed.set(newAddr);
						Channel channelNew = this.createChannel(newAddr);
						if (channelNew != null)
							return channelNew;
					}
				}
			} catch (Exception e) {
				log.error("getAndCreateNameserverChannel: create name server channel exception", e);
			} finally {
				this.lockNamesrvChannel.unlock();
			}
		} else {
			log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LockTimeoutMillis);
		}

		return null;
	}

	private Channel createChannel(final String addr) throws InterruptedException {
		ChannelWrapper cw = this.channelTables.get(addr);
		if (cw != null && cw.isOK()) {
			return cw.getChannel();
		}

		// 进入临界区后，不能有阻塞操作，网络连接采用异步方式
		if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
			try {
				boolean createNewConnection = false;
				cw = this.channelTables.get(addr);
				if (cw != null) {
					// channel正常
					if (cw.isOK()) {
						return cw.getChannel();
					}
					// 正在连接，退出锁等待
					else if (!cw.getChannelFuture().isDone()) {
						createNewConnection = false;
					}
					// 说明连接不成功
					else {
						this.channelTables.remove(addr);
						createNewConnection = true;
					}
				}
				// ChannelWrapper不存在
				else {
					createNewConnection = true;
				}

				if (createNewConnection) {
					ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
					log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
					cw = new ChannelWrapper(channelFuture);
					this.channelTables.put(addr, cw);
				}
			} catch (Exception e) {
				log.error("createChannel: create channel exception", e);
			} finally {
				this.lockChannelTables.unlock();
			}
		} else {
			log.warn("createChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis);
		}

		if (cw != null) {
			ChannelFuture channelFuture = cw.getChannelFuture();
			if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
				if (cw.isOK()) {
					log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
					return cw.getChannel();
				} else {
					log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
				}
			} else {
				log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString());
			}
		}

		return null;
	}

	public void closeChannel(final String addr, final Channel channel) {
		if (null == channel)
			return;

		final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;

		try {
			if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					boolean removeItemFromTable = true;
					final ChannelWrapper prevCW = this.channelTables.get(addrRemote);

					log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, (prevCW != null));

					if (null == prevCW) {
						log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
						removeItemFromTable = false;
					} else if (prevCW.getChannel() != channel) {
						log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.", addrRemote);
						removeItemFromTable = false;
					}

					if (removeItemFromTable) {
						this.channelTables.remove(addrRemote);
						log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
					}

					RemotingUtil.closeChannel(channel);
				} catch (Exception e) {
					log.error("closeChannel: close the channel exception", e);
				} finally {
					this.lockChannelTables.unlock();
				}
			} else {
				log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis);
			}
		} catch (InterruptedException e) {
			log.error("closeChannel exception", e);
		}
	}

	public void closeChannel(final Channel channel) {
		if (null == channel)
			return;

		try {
			if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					boolean removeItemFromTable = true;
					ChannelWrapper prevCW = null;
					String addrRemote = null;
					for (String key : channelTables.keySet()) {
						ChannelWrapper prev = this.channelTables.get(key);
						if (prev.getChannel() != null) {
							if (prev.getChannel() == channel) {
								prevCW = prev;
								addrRemote = key;
								break;
							}
						}
					}

					if (null == prevCW) {
						log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
						removeItemFromTable = false;
					}

					if (removeItemFromTable) {
						this.channelTables.remove(addrRemote);
						log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
						RemotingUtil.closeChannel(channel);
					}
				} catch (Exception e) {
					log.error("closeChannel: close the channel exception", e);
				} finally {
					this.lockChannelTables.unlock();
				}
			} else {
				log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis);
			}
		} catch (InterruptedException e) {
			log.error("closeChannel exception", e);
		}
	}

	@Override
	public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
		ExecutorService executorThis = executor;
		if (null == executor) {
			executorThis = this.publicExecutor;
		}

		Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
		this.processorTable.put(requestCode, pair);
	}

	@Override
	public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
		// 1.1）获取Broker地址的Channel信息。
		// 根据broker地址从RemotingClient.channelTables:ConcurrentHashMap<String /* addr */, ChannelWrapper>变量中获取ChannelWrapper对象并返回该对象的Channel变量；
		// 若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中，便于下次使用；
		final Channel channel = this.getAndCreateChannel(addr);
		if (channel != null && channel.isActive()) {
			try {
				// 1.2）若NettyRemotingClient.rpcHook:RPCHook变量不为空（该变量在应用层初始化DefaultMQPushConsumer或者DefaultMQPullConsumer对象传入该值）
				if (this.rpcHook != null) {
					this.rpcHook.doBeforeRequest(addr, request);
				}
				// 1.3)
				RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
				if (this.rpcHook != null) {
					// 1.4）若NettyRemotingClient.rpcHook: RPCHook变量不为空，则调用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法；
					this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
				}
				return response;
			} catch (RemotingSendRequestException e) {
				log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
				this.closeChannel(addr, channel);
				throw e;
			} catch (RemotingTimeoutException e) {
				log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
				throw e;
			}
		} else {
			this.closeChannel(addr, channel);
			throw new RemotingConnectException(addr);
		}
	}

	@Override
	public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
		// 2.1）获取Broker地址的Channel信息。
		// 根据broker地址从RemotingClient.channelTables: ConcurrentHashMap<String /* addr */, ChannelWrapper>变量中
		// 获取ChannelWrapper对象并返回该对象的Channel变量；
		// 若没有ChannelWrapper对象则与broker地址建立新的连接并将连接信息存入channelTables变量中，便于下次使用；
		final Channel channel = this.getAndCreateChannel(addr);
		if (channel != null && channel.isActive()) {
			try {
				if (this.rpcHook != null) {
					// 2.2）若NettyRemotingClient.rpcHook: RPCHook变量不为空（该变量在应用层初始化DefaultMQPushConsumer或者DefaultMQPullConsumer对象传入该值），则调用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法；
					this.rpcHook.doBeforeRequest(addr, request);
				}
				// 2.3）调用NettyRemotingAbstract.invokeAsyncImpl(Channel channel, RemotingCommand request,long timeoutMillis,InvokeCallback invokeCallback)方法，
				this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
			} catch (RemotingSendRequestException e) {
				log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
				this.closeChannel(addr, channel);
				throw e;
			}
		} else {
			this.closeChannel(addr, channel);
			throw new RemotingConnectException(addr);
		}
	}

	@Override
	public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
		final Channel channel = this.getAndCreateChannel(addr);
		if (channel != null && channel.isActive()) {
			try {
				if (this.rpcHook != null) {
					this.rpcHook.doBeforeRequest(addr, request);
				}
				this.invokeOnewayImpl(channel, request, timeoutMillis);
			} catch (RemotingSendRequestException e) {
				log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
				this.closeChannel(addr, channel);
				throw e;
			}
		} else {
			this.closeChannel(addr, channel);
			throw new RemotingConnectException(addr);
		}
	}

	@Override
	public ExecutorService getCallbackExecutor() {
		return this.publicExecutor;
	}

	@Override
	public void updateNameServerAddressList(List<String> addrs) {
		List<String> old = this.namesrvAddrList.get();
		boolean update = false;

		if (!addrs.isEmpty()) {
			if (null == old) {
				update = true;
			} else if (addrs.size() != old.size()) {
				update = true;
			} else {
				for (int i = 0; i < addrs.size() && !update; i++) {
					if (!old.contains(addrs.get(i))) {
						update = true;
					}
				}
			}

			if (update) {
				Collections.shuffle(addrs);
				this.namesrvAddrList.set(addrs);
			}
		}
	}

	@Override
	public ChannelEventListener getChannelEventListener() {
		return channelEventListener;
	}

	public List<String> getNamesrvAddrList() {
		return namesrvAddrList.get();
	}

	@Override
	public List<String> getNameServerAddressList() {
		return this.namesrvAddrList.get();
	}

	public RPCHook getRpcHook() {
		return rpcHook;
	}

	@Override
	public void registerRPCHook(RPCHook rpcHook) {
		this.rpcHook = rpcHook;
	}

	@Override
	public RPCHook getRPCHook() {
		return this.rpcHook;
	}

	@Override
	public boolean isChannelWriteable(String addr) {
		ChannelWrapper cw = this.channelTables.get(addr);
		if (cw != null && cw.isOK()) {
			return cw.isWriteable();
		}
		return true;
	}
}
